-
Notifications
You must be signed in to change notification settings - Fork 396
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RecoveryHelper to speed up recovery after restart #471
base: master
Are you sure you want to change the base?
RecoveryHelper to speed up recovery after restart #471
Conversation
This patch introduces a change in WAL and recovery process. The idea is to avoid expensive scan of all table files and maintain a recovery record in the WAL instead. This record is written in the beginning of WAL log and it is not surrounded with 'begin and 'end' markers. There are following situations possible: a. There is no recovery record, there are normal records in WAL b. There is no recovery record, no other records in WAL c. There is a recovery record, there are normal records in WAL d. There is a recovery record, no other records in WAL Since recovery record is writted in the beginning, then it contains the latest offset only in a case when there is nothing else in the log, or other records are invalid(temp files are deleted). So in cases a,c and d recovery process will pick the committed file from WAL with highest offset - either from recovery record or from normal records. In case b when WAL log is empty or doesn't exist - latest offset will be discovered through full recoursive folder scan.
@confluentinc It looks like @justpresident just signed our Contributor License Agreement. 👍 Always at your service, clabot |
@kkonstantine would you please check this out? we've been running this in production for a while now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@justpresident Thanks for making this PR. I'm curious what sort of speedup you are seeing in your environment?
return instance; | ||
} | ||
|
||
private final Map<TopicPartition, List<String>> files = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this singleton instance will be accessed potentially by multiple threads when we have multiple tasks running on single worker. The map probably needs to be a ConcurrentHashMap
or access protected by locks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, correct. The way we launch it is quite special. We launch all instances in standalone mode in kubernetes, one worker per pod. So I have overlooked possibility of having multiple workers on the same machine. Fixed
@@ -128,6 +130,9 @@ public void apply() throws ConnectException { | |||
WALEntry mapKey = new WALEntry(key.getName()); | |||
WALEntry mapValue = new WALEntry(value.getName()); | |||
entries.put(mapKey, mapValue); | |||
if (value.getName().equals(RecoveryHelper.RECOVERY_RECORD_KEY)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this do anything? RECOVERY_RECORD_KEY
is written to the key here - wal.append(RecoveryHelper.RECOVERY_RECORD_KEY, fileStatusWithMaxOffset.getPath().toString())
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was a mistake. Fixed it.
@@ -120,6 +121,7 @@ public void apply() throws ConnectException { | |||
for (Map.Entry<WALEntry, WALEntry> entry: entries.entrySet()) { | |||
String tempFile = entry.getKey().getName(); | |||
String committedFile = entry.getValue().getName(); | |||
RecoveryHelper.getInstance().addFile(committedFile); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am concerned that the lists in the map grow without bounds and eventually may cause OOM if the process is run long enough.
The speedup of course depends on the number of existing files in the table. The initial scan, that usually takes around 1 hour for large tables is eliminated completely. The startup is now instant |
Hello, |
I don't work with kafka-connect anymore and don't have such a setup with thousands of hdfs files to test, but it seems like the problem was solved in a very similar way in #556 |
Roman Studenikin seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
This patch introduces a change in WAL and recovery process.
The idea is to avoid expensive scan of all table files that could take longer than one hour for large tables and maintain a recovery record in the WAL instead. This record is written in the beginning of WAL log and it is not surrounded with 'begin and 'end' markers.
There are following situations possible:
a. There is no recovery record, there are normal records in WAL
b. There is no recovery record, no other records in WAL
c. There is a recovery record, there are normal records in WAL
d. There is a recovery record, no other records in WAL
Since recovery record is written in the beginning, then it contains the latest offset only in a case when there is nothing else in the log, or other records are invalid(temp files are deleted). So in cases a,c and d recovery process will pick the committed file from WAL with highest offset - either from recovery record or from normal records.
In case (b) when WAL log is empty or doesn't exist - latest offset will be discovered through full recursive folder scan.